5ae2c7cbc66aa187b063b89535ccbe0fdded4673,cdap-app-templates/cdap-etl/cdap-data-pipeline/src/test/java/co/cask/cdap/datapipeline/DataPipelineTest.java,DataPipelineTest,testInnerJoin,#,734

Before Change


    Assert.assertEquals(expected, actual);

    validateMetric(2, appId, "testJoiner.records.out");
    validateMetric(2, appId, "sink1.records.in");
  }

  @Test

After Change


    String input2Name = "source2InnerJoinInput-" + engine;
    String input3Name = "source3InnerJoinInput-" + engine;
    String outputName = "innerJoinOutput-" + engine;
    String joinerName = "innerJoiner-" + engine;
    String sinkName = "innerJoinSink-" + engine;
    ETLBatchConfig etlConfig = ETLBatchConfig.builder("* * * * *")
      .addStage(new ETLStage("source1", MockSource.getPlugin(input1Name)))
      .addStage(new ETLStage("source2", MockSource.getPlugin(input2Name)))
      .addStage(new ETLStage("source3", MockSource.getPlugin(input3Name)))
      .addStage(new ETLStage("t1", FieldsPrefixTransform.getPlugin("", inputSchema1.toString())))
      .addStage(new ETLStage("t2", FieldsPrefixTransform.getPlugin("", inputSchema2.toString())))
      .addStage(new ETLStage("t3", FieldsPrefixTransform.getPlugin("", inputSchema3.toString())))
      .addStage(new ETLStage(joinerName, MockJoiner.getPlugin("t1.customer_id=t2.cust_id=t3.c_id&" +
                                                                  "t1.customer_name=t2.cust_name=t3.c_name",
                                                                "t1,t2,t3", "")))
      .addStage(new ETLStage(sinkName, MockSink.getPlugin(outputName)))
      .addConnection("source1", "t1")
      .addConnection("source2", "t2")
      .addConnection("source3", "t3")
      .addConnection("t1", joinerName)
      .addConnection("t2", joinerName)
      .addConnection("t3", joinerName)
      .addConnection(joinerName, sinkName)
      .setEngine(engine)
      .build();

    AppRequest<ETLBatchConfig> appRequest = new AppRequest<>(APP_ARTIFACT, etlConfig);
    Id.Application appId = Id.Application.from(Id.Namespace.DEFAULT, "JoinerApp");
    ApplicationManager appManager = deployApplication(appId, appRequest);

    Schema outSchema = Schema.recordOf(
      "join.output",
      Schema.Field.of("customer_id", Schema.of(Schema.Type.STRING)),
      Schema.Field.of("customer_name", Schema.of(Schema.Type.STRING)),
      Schema.Field.of("item_id", Schema.of(Schema.Type.STRING)),
      Schema.Field.of("item_price", Schema.of(Schema.Type.LONG)),
      Schema.Field.of("cust_id", Schema.of(Schema.Type.STRING)),
      Schema.Field.of("cust_name", Schema.of(Schema.Type.STRING)),
      Schema.Field.of("t_id", Schema.of(Schema.Type.STRING)),
      Schema.Field.of("c_id", Schema.of(Schema.Type.STRING)),
      Schema.Field.of("c_name", Schema.of(Schema.Type.STRING))
    );

    StructuredRecord recordSamuel = StructuredRecord.builder(inputSchema1).set("customer_id", "1")
      .set("customer_name", "samuel").build();
    StructuredRecord recordBob = StructuredRecord.builder(inputSchema1).set("customer_id", "2")
      .set("customer_name", "bob").build();
    StructuredRecord recordJane = StructuredRecord.builder(inputSchema1).set("customer_id", "3")
      .set("customer_name", "jane").build();

    StructuredRecord recordCar = StructuredRecord.builder(inputSchema2).set("item_id", "11").set("item_price", 10000L)
      .set("cust_id", "1").set("cust_name", "samuel").build();
    StructuredRecord recordBike = StructuredRecord.builder(inputSchema2).set("item_id", "22").set("item_price", 100L)
      .set("cust_id", "3").set("cust_name", "jane").build();

    StructuredRecord recordTrasCar = StructuredRecord.builder(inputSchema3).set("t_id", "1").set("c_id", "1")
      .set("c_name", "samuel").build();
    StructuredRecord recordTrasBike = StructuredRecord.builder(inputSchema3).set("t_id", "2").set("c_id", "3")
      .set("c_name", "jane").build();

    // write one record to each source
    DataSetManager<Table> inputManager = getDataset(Id.Namespace.DEFAULT, input1Name);
    MockSource.writeInput(inputManager, ImmutableList.of(recordSamuel, recordBob, recordJane));
    inputManager = getDataset(Id.Namespace.DEFAULT, input2Name);
    MockSource.writeInput(inputManager, ImmutableList.of(recordCar, recordBike));
    inputManager = getDataset(Id.Namespace.DEFAULT, input3Name);
    MockSource.writeInput(inputManager, ImmutableList.of(recordTrasCar, recordTrasBike));

    WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME);
    workflowManager.start();
    workflowManager.waitForFinish(5, TimeUnit.MINUTES);


    StructuredRecord joinRecordSamuel = StructuredRecord.builder(outSchema)
      .set("customer_id", "1").set("customer_name", "samuel")
      .set("item_id", "11").set("item_price", 10000L).set("cust_id", "1").set("cust_name", "samuel")
      .set("t_id", "1").set("c_id", "1").set("c_name", "samuel").build();

    StructuredRecord joinRecordJane = StructuredRecord.builder(outSchema)
      .set("customer_id", "3").set("customer_name", "jane")
      .set("item_id", "22").set("item_price", 100L).set("cust_id", "3").set("cust_name", "jane")
      .set("t_id", "2").set("c_id", "3").set("c_name", "jane").build();

    DataSetManager<Table> sinkManager = getDataset(outputName);
    Set<StructuredRecord> expected = ImmutableSet.of(joinRecordSamuel, joinRecordJane);
    Set<StructuredRecord> actual = Sets.newHashSet(MockSink.readOutput(sinkManager));
    Assert.assertEquals(expected, actual);

    validateMetric(2, appId, joinerName + ".records.out");
    validateMetric(2, appId, sinkName + ".records.in");
  }